package com.xdl.apitest.tabletest

import com.xdl.apitest.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, Tumble}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.types.Row

/**
 * Project: FlinkTutorial
 * Package: com.xdl.apitest
 * Version: 1.0
 *
 * Created by guoxiaolong on 2020-08-01-18:21
 */
object TimeAndWindowTest {
  def main(args: Array[String]): Unit = {
    //0.创建流执行环境，读取数据并转换成样例类
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.创建表执行环境
    val tableEnv = StreamTableEnvironment.create(env)

    // 2. 读取数据转换成流， map 成样例类
    val filePath: String = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(filePath)

    //map 成样例类型
    val dataStream: DataStream[SensorReading] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
      })

    //将流转换成表，直接定义时间字段
    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'pt.proctime)

    // 1. Table API
    //1.1 Group Window 聚合操作
    val resultTable: Table = sensorTable
      .window(Tumble over 10.seconds on 'ts as 'tw)
      .groupBy('id, 'tw)
      .select('id, 'id.count, 'tw.end)

    //1.2 Over Window 聚合操作
    val overResultTable: Table = sensorTable
      .window(Over partitionBy 'id orderBy 'ts preceding 2.rows as 'ow)
      .select('id, 'ts, 'id.count over 'pw, 'temperature.avg over 'ow)

    //2. SQL 实现
    // 2.1 Group Windows
    tableEnv.createTemporaryView("sensor", sensorTable)
    val resultSqlTable: Table = tableEnv.sqlQuery(
      """
        |select id, count(id), hop_end(ts, interval '4' second, interval '10' second)
        |from sensor
        |group by id, hop(ts, interval '4' second, interval '10' second)
        |""".stripMargin
    )

    // 2.2 Over Window
    val orderSqlTable: Table = tableEnv.sqlQuery(
      """
        |select id, ts, count(id), over w, abg(temperature) over w
        |from sensor
        |window w as (
        | partition by id
        | order by ts
        | rows between 2 preceding and current row
        |)
        |""".stripMargin
    )

    //打印输出
    //resultTable.toRetractStream[Row].print("agg")
    //overResultTable.toAppendStreamp[Row].print("over reuslt")
    //resultSqlTable.toAppendStreamp[Row].print("agg sql")
    orderSqlTable.toAppendStream[Row].print("order sql")


    //sensorReading.printSchema()
    env.execute("time and window test job")
  }

}
